{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kinesis Data Analytics for SQL Applications\n",
    "\n",
    "With Amazon Kinesis Data Analytics for SQL Applications, you can process and analyze streaming data using standard SQL. The service enables you to quickly author and run powerful SQL code against streaming sources to perform time series analytics, feed real-time dashboards, and create real-time metrics.\n",
    "\n",
    "The service supports ingesting data from Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose streaming sources. Then, you author your SQL code using the interactive editor and test it with live streaming data. You can also configure destinations where you want Kinesis Data Analytics to send the results."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create AWS Lambda Function as Kinesis Data Analytics Destination\n",
    "\n",
    "Kinesis Data Analytics supports Amazon Kinesis Data Firehose, AWS Lambda, and Amazon Kinesis Data Streams as destinations. Let's create a Lambda function to publish our SQL application results as custom metrics to CloudWatch Metrics."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/kinesis-analytics-lambda.png\" width=\"80%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "import json\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "iam = boto3.Session().client(service_name=\"iam\", region_name=region)\n",
    "sts = boto3.Session().client(service_name=\"sts\", region_name=region)\n",
    "account_id = sts.get_caller_identity()[\"Account\"]\n",
    "\n",
    "lam = boto3.Session().client(service_name=\"lambda\", region_name=region)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Retrieve AWS Lambda Function Name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r lambda_fn_name_cloudwatch"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    lambda_fn_name_cloudwatch\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(lambda_fn_name_cloudwatch)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Check IAM Roles Are In Place"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_lambda_role_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_lambda_role_name\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_lambda_role_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_lambda_role_passed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_lambda_role_passed\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_lambda_role_passed)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if not iam_lambda_role_passed:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "else:\n",
    "    print(\"[OK]\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_role_lambda_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_role_lambda_arn\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run all previous notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_role_lambda_arn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review Lambda Function Code "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!pygmentize src/deliver_metrics_to_cloudwatch.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Zip The Function Code"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!zip src/DeliverKinesisAnalyticsToCloudWatch.zip src/deliver_metrics_to_cloudwatch.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Load the .zip File as Binary Code"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "with open(\"src/DeliverKinesisAnalyticsToCloudWatch.zip\", \"rb\") as f:\n",
    "    code = f.read()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create The Lambda Function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from botocore.exceptions import ClientError\n",
    "\n",
    "try:\n",
    "    response = lam.create_function(\n",
    "        FunctionName=\"{}\".format(lambda_fn_name_cloudwatch),\n",
    "        Runtime=\"python3.9\",\n",
    "        Role=\"{}\".format(iam_role_lambda_arn),\n",
    "        Handler=\"src/deliver_metrics_to_cloudwatch.lambda_handler\",\n",
    "        Code={\"ZipFile\": code},\n",
    "        Description=\"Deliver output records from Kinesis Analytics application to CloudWatch.\",\n",
    "        Timeout=900,\n",
    "        MemorySize=128,\n",
    "        Publish=True,\n",
    "    )\n",
    "    print(\"Lambda Function {} successfully created.\".format(lambda_fn_name_cloudwatch))\n",
    "\n",
    "except ClientError as e:\n",
    "    if e.response[\"Error\"][\"Code\"] == \"ResourceConflictException\":\n",
    "        response = lam.update_function_code(\n",
    "            FunctionName=\"{}\".format(lambda_fn_name_cloudwatch), ZipFile=code, Publish=True, DryRun=False\n",
    "        )\n",
    "        print(\"Updating existing Lambda Function {}.  This is OK.\".format(lambda_fn_name_cloudwatch))\n",
    "    else:\n",
    "        print(\"Error: {}\".format(e))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = lam.get_function(FunctionName=lambda_fn_name_cloudwatch)\n",
    "\n",
    "lambda_fn_arn_cloudwatch = response[\"Configuration\"][\"FunctionArn\"]\n",
    "print(lambda_fn_arn_cloudwatch)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "%store lambda_fn_arn_cloudwatch"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review Lambda Function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"blank\" href=\"https://console.aws.amazon.com/lambda/home?region={}#/functions/{}\"> Lambda Function</a></b>'.format(\n",
    "            region, lambda_fn_name_cloudwatch\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Store Variables for Next Notebooks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "\n",
    "<p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "<button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "        \n",
    "<script>\n",
    "try {\n",
    "    els = document.getElementsByClassName(\"sm-command-button\");\n",
    "    els[0].click();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}    \n",
    "</script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
